package net.gdface.facelog;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

import gu.dtalk.Ack;
import gu.dtalk.cmd.CmdManager;
import gu.dtalk.cmd.TaskManager;
import gu.simplemq.IProducer;
import gu.simplemq.IPublisher;
import gu.simplemq.ISubscriber;
import gu.simplemq.Channel;
import gu.simplemq.IMessageAdapter;
import gu.simplemq.exceptions.SmqUnsubscribeException;
import gu.simplemq.json.BaseJsonEncoder;
import static com.google.common.base.Preconditions.*;

class DtalkCmd {
	private static final int DEFAULT_TASK_TIMEOUT_SECS = 4;
	private final TokenMangement tm;
	private final CmdManager cmdManager;
	private final TaskManager taskManager;
	private final ISubscriber subscriber;
	DtalkCmd(Map<MQParam, String> mqParameters,TokenMangement tm, MessageQueueManagement mm) {
		this.tm = tm;
		IPublisher publisher = mm.getFactory().getPublisher();
		this.subscriber = mm.getFactory().getSubscriber();
		IProducer producer = mm.getFactory().getProducer();
		this.cmdManager = new CmdManager(publisher, subscriber,mqParameters.get(MQParam.CMD_CHANNEL));
		this.taskManager = new TaskManager(publisher, subscriber,producer);
	}

	/**
	 * (异步)执行cmdpath指定的设备命令<br>
	 * @param target 命令目标设备/设备组id
	 * @param group target中的元素是否为设备组id
	 * @param cmdpath 设备命令的dtalk路径
	 * @param jsonArgs 设备命令参数(JSON)
	 * @param ackChannel 设备命令响应频道,可为{@code null}
	 * @param id 执行设备命令的用户id
	 * @return 以map形式返回收到命令的客户端数目和命令序列号,如{"client":25,"cmdSn":12309898}
	 */
	String doRunCmd(List<Integer>target,boolean group,String cmdpath,String jsonArgs,String ackChannel,int id){
		checkArgument(target != null,"target is null");
		// 过滤所有空元素
		target = Lists.newArrayList(Iterables.filter(target, Predicates.notNull()));
		checkArgument(!target.isEmpty(),"target list is empty");
		int cmdSn = tm.applyCmdSn(id);
		cmdManager.targetBuilder()
			.setTarget(target, group)
			.setCmdSn(cmdSn)
			.setAckChannel(ackChannel);
		int c = cmdManager.runCmd(cmdpath, BaseJsonEncoder.getEncoder().fromJson(jsonArgs,JSONObject.class));
		return BaseJsonEncoder.getEncoder().toJsonString(ImmutableMap.of("client",c,"cmdSn",cmdSn));
	}
	/**
	 * (异步)执行cmdpath指定的任务<br>
	 * @param taskQueue 任务队列名称
	 * @param cmdpath 设备命令的dtalk路径
	 * @param jsonArgs 设备命令参数(JSON)
	 * @param ackChannel 设备命令响应频道,可为{@code null}
	 * @param id 执行设备命令的用户id
	 * @return 成功提交任务返回命令序列号,否则返回{@code null}
	 */
	Integer doRunTask(String taskQueue,String cmdpath,String jsonArgs,String ackChannel,int id){
		int cmdSn = tm.applyCmdSn(id);
		taskManager.targetBuilder()
			.setCmdSn(cmdSn)
			.setAckChannel(ackChannel);
		boolean ret = taskManager
				.setTaskQueue(taskQueue)
				.setCmdpath(cmdpath)
				.runCmd(BaseJsonEncoder.getEncoder().fromJson(jsonArgs,JSONObject.class));
		return ret ? cmdSn : null;
	}
	/**
	 * (同步)执行cmdpath指定的任务<br>
	 * 将异常执行任务({@link #doRunTask(String, String, String, String, int)})封装为同步执行
	 * @param taskQueue 任务队列名称
	 * @param cmdpath 设备命令的dtalk路径
	 * @param jsonArgs 设备命令参数(JSON)
	 * @param timeoutSecs 任务超时(秒),等待任务执行的最长时间,为0使用默认值
	 * @param id 执行设备命令的用户id
	 * @return 设备命令响应对象,如果执行超时返回{@code null}
	 * @throws IllegalStateException  提交任务失败:可能没有执行任务的设备
	 */
	@SuppressWarnings("rawtypes")
	Ack doRunTaskSync(String taskQueue, String cmdpath, String jsonArgs, int timeoutSecs, int id) {
		if(timeoutSecs <= 0){
			timeoutSecs = DEFAULT_TASK_TIMEOUT_SECS;
		} 
		String ackChannelName = tm.applyAckChannel(id, timeoutSecs);
		final AtomicReference<Ack> ack = new AtomicReference<>();
		Channel<Ack> ackChannel = new Channel<Ack>(ackChannelName,new IMessageAdapter<Ack>(){

			@Override
			public void onSubscribe(Ack t) throws SmqUnsubscribeException {
				ack.set(t);
				synchronized (ack) {
					ack.notifyAll();
				}
				// 抛出异常指示调用层取消频道订阅
				throw new SmqUnsubscribeException(true);
			}}){};
			
		subscriber.register(ackChannel);
		Integer cmdSn = doRunTask(taskQueue,cmdpath,jsonArgs,ackChannelName,id);
		checkState(cmdSn != null,"FAIL TO SUBMIT TASK");
		if(ack.get() == null){
			synchronized (ack) {
				try {
					ack.wait(timeoutSecs*1000);
				} catch (InterruptedException e) {
					return null;
				}
			}
		}
		return ack.get();
	}
}
